package com.kvn.dal.core;

import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerUtils;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.triggers.CronTriggerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.CollectionUtils;

import com.kvn.dal.common.PropertyValueResolverUtil;
import com.kvn.dal.core.dao.IJobInfoDao;
import com.kvn.dal.core.dao.IJobLogDao;
import com.kvn.dal.core.exception.SchedulerErrorCode;
import com.kvn.dal.core.pojo.JobInfo;
import com.kvn.dal.core.pojo.JobLog;
import com.kvn.dal.core.pojo.enums.JobStatus;
import com.kvn.dal.core.task.ExecutableTask;
import com.kvn.dal.core.task.ExecutableTaskDeligate;
import com.kvn.dal.core.task.KvnJobFactory;
import com.kvn.dal.log.Log;

/**
 * 分布式调度引导类
 * Created by wzy on 2017/5/24.
 */
public class JobBootstrap implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(JobBootstrap.class);
    /**
     * minInterval: job 执行时的最小时间间隔（单位：秒）；（Note:只对globalSingle=true时生效）<br/>
     * 确保在多进程情况下，容器启动时间不同，或者服务器时钟不同，在指定的job执行的最小时间间隔内，只有集群中的一台机能执行调度任务。
     */
    private int minInterval = 300; // 默认5min

    public void setMinInterval(int minInterval) {
        this.minInterval = minInterval;
    }

    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private IJobLogDao jobLogDao;
    @Resource
    private IJobInfoDao jobInfoDao;
    @Resource
    private PropertyValueResolverUtil propertyValueResolverUtil;

    public static final Map<String, JobLog> JOB_INFO_MAP = new HashMap<>();

    @Override
    public void afterPropertiesSet() throws Exception {
        // load tasks
        loadTasks();

        // 打印task列表日志
        printTasks();

        // start quartz
        startQuartz();

    }

    private void printTasks() {
        logger.info("************************************ Registed Tasks list ************************************");
        for (JobLog job : JOB_INFO_MAP.values()) {
            logger.info("* jobCoe:{}, corn:{}, isGlobalSingle:{}, desc:{} *", job.getCode(), job.getCron(), job.isGlobalSingle(),
                    job.getDescription());
        }
        logger.info("**********************************************************************************************");
    }

    private void startQuartz() throws Exception {
        // 通过schedulerFactory获取一个调度器
        SchedulerFactory schedulerfactory = new StdSchedulerFactory();
        KvnJobFactory jobFactory = new KvnJobFactory();

        for (JobLog jobLog : JOB_INFO_MAP.values()) {
            Scheduler scheduler = null;
            try {
                // 通过schedulerFactory获取一个调度器
                scheduler = schedulerfactory.getScheduler();
                scheduler.setJobFactory(jobFactory);

                // 创建jobDetail实例，绑定Job实现类， 指明job的名称，所在组的名称，以及绑定job类。 name,group ?? TODO
                JobDetail job = JobBuilder.newJob(jobLog.getExecutableTaskClass()).withIdentity(jobLog.getCode(), "group_" + jobLog.getCode())
                        .build();

                // 定义调度触发规则,
                // 使用simpleTrigger规则（cornTrigger）（http://cron.qqe2.com/ 生成corn）
                Trigger trigger = TriggerBuilder.newTrigger().withIdentity("simpleTrigger_" + jobLog.getCode(), "triggerGroup_" + jobLog.getCode())
                        .withSchedule(CronScheduleBuilder.cronSchedule(jobLog.getCron())).startNow().build();

                // 把作业和触发器注册到任务调度中
                scheduler.scheduleJob(job, trigger);

                // 启动调度
                System.out.println(DateTime.now() + "------quartz start----");
                scheduler.start();

            } catch (Exception e) {
                logger.error("[jobCode={}, corn={}] 启动异常....ERROR==>{}", jobLog.getCode(), jobLog.getCron(), e);
                throw e;
            }
        }

    }

    private void loadTasks() throws ParseException {
        final String logPrifix = "[jobCode={}, corn={}, globalSingle={}]";

        // 自动扫描
        Map<String, ExecutableTask> beanMap = applicationContext.getBeansOfType(ExecutableTask.class);
        for (ExecutableTask task : beanMap.values()) {
            final JobLog job = new JobLog();
            final TimedTask tt = task.getClass().getAnnotation(TimedTask.class);
            if (tt == null) {
                throw SchedulerErrorCode.MISS_TIMEDTASK_ANNOTATION.exp(task.getClass().getName());
            }
            job.setCode(task.getClass().getName()); // 使用类名做jobCode
            String cron = resolveCron(tt.cron());
            job.setCron(cron);
            job.setGlobalSingle(tt.isGlobalSingle());
            job.setDescription(tt.desc());
            job.setExecuteType("system");
            // 决断是否是一天多次执行的任务，同时做 corn 预检查
            final boolean isRateLessThan1Day = isRateLessThan1Day(job);
            job.setTask(new ExecutableTaskDeligate(task) {
                @Override
                protected void doAfter(JobExecutionContext context, final JobLog curJobLog) {
                    logger.info(logPrifix + "执行成功，更新jobInfo状态为SUCCESS.....", job.getCode(), job.getCron(), job.isGlobalSingle());
                    jobLogDao.update(curJobLog.getId(), JobStatus.SUCCESS, JobStatus.PROCESSING, null);
                }

                @Override
                protected void doThrow(Exception e, final JobLog curJobLog) {
                    logger.info(logPrifix + "执行失败，更新jobInfo状态为FAIL.....", job.getCode(), job.getCron(), job.isGlobalSingle());
                    String errMsg = e.getMessage().length() < 100 ? e.getMessage() : e.getMessage().substring(0, 100);
                    jobLogDao.update(curJobLog.getId(), JobStatus.FAIL, JobStatus.PROCESSING, errMsg);
                }

                @Override
                protected void doBefore(JobExecutionContext context, final JobLog curJobLog) {
                    logger.info(logPrifix + " PREPARE.....", job.getCode(), job.getCron(), job.isGlobalSingle());
                    // 初始化jobInfo
                    JobInfo jobInfo = initJobInfo(job);

                    try {

                        if (tt.isGlobalSingle()) { /** 分布式集群job，单台机运行 */
                            handleGlobalSingleJob(job, isRateLessThan1Day);
                        } else { /** 多台机运行 */
                            logger.info("[jobCode={}]在多台机执行，直接执行.....", job.getCode());
                            jobLogDao.add(job);
                        }

                    } finally {
                        // 绑定jobLog到curJobLog，作为上下文传递
                        bindCurJobInfo(job, curJobLog);
                    }

                }

                private void bindCurJobInfo(JobLog jobLog, final JobLog curJobLog) {
                    curJobLog.bindJobInfo(jobLog);
                }
            });
            job.setExecutableTaskClass(task.getClass());
            JOB_INFO_MAP.put(task.getClass().getName(), job);
        }
    }
    
    private String resolveCron(String cron) {
		if(cron.matches("\\$\\{.+\\}")){
			return propertyValueResolverUtil.resolve(cron);
		}
		return cron;
	}

	/**
     * Note: 使用 【悲观锁 + JobStatusCheck + TimeLimit】 实现在多线程与多进程（主要是多进程）环境下，一个job在运行过程中，只会有一台机在执行job
     */
    private void handleGlobalSingleJob(JobLog job, boolean isRateLessThan1Day) {
        // 手动开启事务
        //1.获取事务控制管理器
        DataSourceTransactionManager transactionManager = applicationContext.getBean(DataSourceTransactionManager.class);
        //2.获取事务定义
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        //3.设置事务隔离级别，开启新事务
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
        //4.获得事务状态
        TransactionStatus transactionStatus = transactionManager.getTransaction(def);

        // I. 悲观锁
        dbLock(job);

        try {
            JobLog lastJobLog = jobLogDao.getLastJobLog(job.getCode());
            if (lastJobLog == null) { // 新job首次部署,直接执行
                logger.info("[jobCode={}, corn={}] job首次运行，添加第一条jobLog.....", job.getCode(), job.getCron());
                jobLogDao.add(job);
                return;
            }

            // II. job状态检查
            if (JobStatus.PROCESSING == lastJobLog.getJobStatus()) {
                throw SchedulerErrorCode.JOB_RUNING.exp(); // 流程终止
            }

            Date dbCurDate = jobLogDao.getCurTime();

            // III. TimeLimit 检查
            if (isRateLessThan1Day) { // 执行频率小于1天，即1天多次运行
                boolean canExecute = (dbCurDate.getTime() - lastJobLog.getStartTime().getTime()) / 1000 >= minInterval;
                if (canExecute) {
                    jobLogDao.add(job); // jobLog.status = PROCESSING
                } else {
                    logger.info("[jobCode={}]已经执行过.....本次执行超过最小执行频率minInterval={}忽略......", job.getCode(), minInterval);
                    throw SchedulerErrorCode.JOB_HAS_RUN.exp(); // 流程终止
                }
            } else { // 执行频率大于等于1天，即1天最多运行1次
                boolean todayHasRun = new DateTime(dbCurDate).dayOfYear().get() == new DateTime(lastJobLog.getStartTime()).dayOfYear()
                        .get();
                if (todayHasRun) {
                    logger.info("[jobCode={}, corn={}]今天已经执行过.....本次执行忽略......", job.getCode(), job.getCron());
                    throw SchedulerErrorCode.JOB_HAS_RUN.exp(); // 流程终止
                } else {
                    jobLogDao.add(job);
                }
            }
        } catch (Exception e) {
            logger.info("[jobCode={}, corn={}]运行前失败......ERROR==>{}", job.getCode(), job.getCron(), e);
            transactionManager.rollback(transactionStatus);
            throw e; // 流程终止
        } finally {
            // 提交事务
            if (!transactionStatus.isCompleted()) {
                transactionManager.commit(transactionStatus);
            }
        }
    }


    /**
     * select for update 上锁 方法用事务，实现基于DB的分布式锁
     *
     * @param job
     */
    private void dbLock(JobLog job) {
        JobInfo jobInfo = jobInfoDao.select4update(job.getCode());
        if (jobInfo == null) { // job首次部署
            logger.info("[jobCode={}, corn={}] job首次部署，添加jobInfo.....", job.getCode(), job.getCron());
            jobInfo = new JobInfo();
            jobInfo.setCode(job.getCode());
            jobInfo.setCron(job.getCron());
            jobInfo.setDescription(job.getDescription());
            jobInfo.setGlobalSingle(job.isGlobalSingle());
            jobInfoDao.add(jobInfo); // 事务隔离级别及可见性
            jobInfoDao.select4update(job.getCode());
        }
    }

    /**
     * 计算job是否是一天多次运行的
     *
     * @param job
     * @return
     * @throws ParseException
     */
    private boolean isRateLessThan1Day(JobLog job) throws ParseException {
        CronTriggerImpl cronTriggerImpl = new CronTriggerImpl();
        try {
            cronTriggerImpl.setCronExpression(job.getCron());
        } catch (ParseException e) {
            logger.info("[jobCode={}, corn={}] corn表达式错误，解析失败......ERROR==>{}", job.getCode(), job.getCron(), e);
            throw e;
        }
        Date now = new Date();
        Date nowAdd1 = DateTime.now().plusDays(1).toDate();
        List<Date> dates = TriggerUtils.computeFireTimesBetween(cronTriggerImpl, null, now, nowAdd1);// 这个是重点，一行代码搞定~~
        if (CollectionUtils.isEmpty(dates) || dates.size() <= 1) {
            System.out.println(job.getCode() + "," + job.getCron() + "-----------一天执行0-1次");
            return false;
        }
        System.out.println(job.getCode() + "," + job.getCron() + "-----------一天执行>1次");
        // 对单台执行的job执行cron表达式预检查
        precheckCron(job, dates);
        return true;
    }

    /**
     * 检查globalSingle=true的job的cron表达式是否合理
     * @param job
     * @param dates
     */
    private void precheckCron(JobLog job, List<Date> dates) {
		if(job.isGlobalSingle() == false){
			return;
		}
		boolean isCronValid = new DateTime(dates.get(0)).plusSeconds(minInterval).toDate().compareTo(dates.get(1)) <= 0;
		if(!isCronValid){
			logger.warn("[jobCode={}, corn={}] cron表达式不在合理的区间，job的执行间隔小于 job 执行时的最小时间间隔[minInterval:{}s]", job.getCode(), job.getCron(), minInterval);
		}
	}

	/**
     * 初始化jobInfo信息
     *
     * @param job
     * @return
     */
    private JobInfo initJobInfo(JobLog job) {
        JobInfo jobInfo = jobInfoDao.selectOne(job.getCode());
        if (jobInfo == null) { // job首次部署
            logger.info("[jobCode={}, corn={}] job首次部署，添加jobInfo.....", job.getCode(), job.getCron());
            jobInfo = new JobInfo();
            jobInfo.setCode(job.getCode());
            jobInfo.setCron(job.getCron());
            jobInfo.setDescription(job.getDescription());
            try {
                jobInfoDao.add(jobInfo);
            } catch (DuplicateKeyException e) {
                if (job.isGlobalSingle()) {
                    logger.warn("[jobCode={}, corn={}] job在其他机器上执行，本机执行结束.....", job.getCode(), job.getCron());
                    throw SchedulerErrorCode.JOB_RUNING.exp();
                }
            }
        }

        return jobInfo;
    }

    public static void main(String[] args) {
//		int day1 = new DateTime(new Date()).dayOfYear().get();
//		int day2 = new DateTime(new Date("2017/05/26")).dayOfYear().get();
//		System.out.println("day = [" + day1 + "," + day2 + "]");
//        Exception e = new RuntimeException("异常xxxxxxx");
//        System.out.println(e.toString());
//        System.out.println(e.getMessage());
//    	long d = DateUtils.getFragmentInSeconds(new Date(), Calendar.SECOND );
//    	System.out.println(new Date(d));
    }

}
